package InvertedIndex;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndex {
    /** 自定义FileInputFormat **/
    public static class FileNameInputFormat extends FileInputFormat<Text, Text> {
        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                           TaskAttemptContext context) throws IOException, InterruptedException {
            FileNameRecordReader fnrr = new FileNameRecordReader();
            fnrr.initialize(split, context);
            return fnrr;
        }
    }

    /** 自定义RecordReader **/
    public static class FileNameRecordReader extends RecordReader<Text, Text> {
        String fileName;
        LineRecordReader lrr = new LineRecordReader();

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return new Text(fileName);
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return lrr.getCurrentValue();
        }

        @Override
        public void initialize(InputSplit arg0, TaskAttemptContext arg1)
                throws IOException, InterruptedException {
            lrr.initialize(arg0, arg1);
            fileName = ((FileSplit) arg0).getPath().getName();
        }

        public void close() throws IOException {
            lrr.close();
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            return lrr.nextKeyValue();
        }

        public float getProgress() throws IOException, InterruptedException {
            return lrr.getProgress();
        }
    }


    public static class InvertedIndexMapper extends
            Mapper<Text, Text, Text, IntWritable> {
        private Set<String> stopwords;
        private Path[] localFiles;
        private String pattern = "[^\\w]"; // 正则表达式，代表不是0-9, a-z, A-Z,的所有其它字

        public void setup(Context context) throws IOException, InterruptedException {
            stopwords = new TreeSet<String>();
            Configuration conf = context.getConfiguration();
            localFiles = DistributedCache.getLocalCacheFiles(conf); // 获得停词表
            for (int i = 0; i < localFiles.length; i++) {
                String line;
                BufferedReader br =
                        new BufferedReader(new FileReader(localFiles[i].toString()));
                while ((line = br.readLine()) != null) {
                    StringTokenizer itr = new StringTokenizer(line);
                    while (itr.hasMoreTokens()) {
                        stopwords.add(itr.nextToken());
                    }
                }
            }
        }

        private static boolean isNumber(String str){

            String reg = "^[0-9]+(.[0-9]+)?$";

            return str.matches(reg);

        }

        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            // map()函数这里使用自定义的FileNameRecordReader
            // 得到key: filename文件名; value: line_string每一行的内容
            String temp = new String();
            String line = value.toString().toLowerCase();
            line = line.replaceAll(pattern, " "); // 将非0-9, a-z, A-Z的字符替换为空格
            StringTokenizer itr = new StringTokenizer(line);
            for (; itr.hasMoreTokens();) {
                temp = itr.nextToken();
                if (temp.length()>=3&&!stopwords.contains(temp)&&!isNumber(temp)) {
                    Text word = new Text();
                    word.set(temp + "#" + key);
                    context.write(word, new IntWritable(1));
                }
            }
        }
    }

    /** 使用Combiner将Mapper的输出结果中value部分的词频进行统计 **/
    public static class SumCombiner extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    /** 自定义HashPartitioner，保证 <term, docid>格式的key值按照term分发给Reducer **/
    public static class NewPartitioner extends HashPartitioner<Text, IntWritable> {
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String term = new String();
            term = key.toString().split("#")[0]; // <term#docid>=>term
            return super.getPartition(new Text(term), value, numReduceTasks);
        }
    }

    public static class InvertedIndexReducer extends
            Reducer<Text, IntWritable, Text, Text> {
        private Text word1 = new Text();
        private Text word2 = new Text();
        String temp = new String();
        static Text CurrentItem = new Text(" ");
        static List<String> postingList = new ArrayList<String>();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            word1.set(key.toString().split("#")[0]+':');
            temp = key.toString().split("#")[1];
            for (IntWritable val : values) {
                sum += val.get();
            }
            word2.set( temp + "#" + sum);
            //下面这就就意味着需要这一行已经处理完了，需要进行输出了 在需要输出的时候，我们对其进行一个排序，这样看起来是最好的
            if (!CurrentItem.equals(word1) && !CurrentItem.equals(" ")) {
                StringBuilder out = new StringBuilder();
                long count = 0;
                //那么就需要在向out中添加之前来先进行一个排序
                List<String> tempList = postingList;
                HashMap sorting = new HashMap();
                for(String q: tempList){
                    String t = q.split("#")[0];
                    Integer m = Integer.parseInt(q.split("#")[1]);
                    sorting.put(t,m);
                }
                List<Map.Entry<String, Integer>> infoIds = new ArrayList<Map.Entry<String, Integer>>(sorting.entrySet());
                Collections.sort(infoIds, new Comparator<Map.Entry<String, Integer>>() {
                    public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                        return (o2.getValue() - o1.getValue());
                        //return (o1.getKey()).toString().compareTo(o2.getKey());
                    }
                });4efff
                for (int i = 0; i < infoIds.size(); i++) {
                    String key1 = infoIds.get(i).getKey().toString();
                    String value1 = infoIds.get(i).getValue().toString();
                    String out1 = key1 +"#"+ value1.toString();
                    out.append(out1);
                    if(i!=infoIds.size()-1){
                    out.append(",");
                    }
                    count = count + 1;
                }
                /*Iterator<Map.Entry> entries = sorting.entrySet().iterator();
                while (entries.hasNext()) {
                    Map.Entry entry = (Map.Entry) entries.next();
                    String key1 = (String) entry.getKey();
                    Integer value1 = (Integer) entry.getValue();
                    String out1 = key1 +"*"+ value1.toString();
                    out.append(out1);
                    out.append(",");
                    count = count + 1;
                    //System.out.println("Key = " + key + ", Value = " + value);
                }
                /*
                for (String p : postingList) {
                    out.append(p);
                    out.append(",");
                    count = count + 1;
                }*/
                //out.append("<total#" + count + ">.");
                if (count > 0)
                    context.write(CurrentItem, new Text(out.toString()));
                postingList = new ArrayList<String>();
            }
            CurrentItem = new Text(word1);
            postingList.add(word2.toString()); // 不断向postingList也就是文档名称中添加词表
        }

        // cleanup 一般情况默认为空，有了cleanup不会遗漏最后一个单词的情况

        public void cleanup(Context context) throws IOException,
                InterruptedException {
            StringBuilder out = new StringBuilder();
            long count = 0;
            List<String> tempList = postingList;
            HashMap sorting = new HashMap();
            for(String q: tempList){
                String t = q.split("#")[0];
                Integer m = Integer.parseInt(q.split("#")[1]);
                sorting.put(t,m);
            }
            List<Map.Entry<String, Integer>> infoIds = new ArrayList<Map.Entry<String, Integer>>(sorting.entrySet());
            Collections.sort(infoIds, new Comparator<Map.Entry<String, Integer>>() {
                public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                    return (o2.getValue() - o1.getValue());
                    //return (o1.getKey()).toString().compareTo(o2.getKey());
                }
            });
            for (int i = 0; i < infoIds.size(); i++) {
                String key1 = infoIds.get(i).getKey().toString();
                String value1 = infoIds.get(i).getValue().toString();
                String out1 = key1 +"#"+ value1.toString();
                out.append(out1);
                if(i!=infoIds.size()-1){
                out.append(",");
                }
                count = count + 1;
            }
            /*
            Iterator<Map.Entry> entries = sorting.entrySet().iterator();
            while (entries.hasNext()) {
                Map.Entry entry = (Map.Entry) entries.next();
                String key1 = (String) entry.getKey();
                Integer value1 = (Integer) entry.getValue();
                String out1 = key1 +"#"+ value1.toString();
                out.append(out1);
                out.append(",");
                count = count + 1;
                //System.out.println("Key = " + key + ", Value = " + value);
            }
            /*
            for (String p : postingList) {
                out.append(p);
                out.append(",");
                count = count + 1;
            }*/
            //out.append("<total#" + count + ">.");
            if (count > 0)
                context.write(CurrentItem, new Text(out.toString()));
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI(
                "hdfs://lxy191870098-master:9000/invertedindex/stop-word-list.txt"), conf);// 设置停词列表文档作为当前作业的缓存文件
        Job job = new Job(conf, "inverted index");
        job.setJarByClass(InvertedIndex.class);
        job.setInputFormatClass(FileNameInputFormat.class);
        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);
        job.setPartitionerClass(NewPartitioner.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

